import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import scalikejdbc.config.DBs
import scalikejdbc.{DB, SQL}

/*
                    .::::.
                  .::::::::.
                 :::::::::::
             ..:::::::::::'	  FUCK YOU
           '::::::::::::'		Goddess bless, never BUG
             .::::::::::
        '::::::::::::::..
             ..::::::::::::.
           ``::::::::::::::::
            ::::``:::::::::'        .:::.
           ::::'   ':::::'       .::::::::.
         .::::'      ::::     .:::::::'::::.
        .:::'       :::::  .:::::::::' ':::::.
       .::'        :::::.:::::::::'      ':::::.
      .::'         ::::::::::::::'         ``::::.
  ...:::           ::::::::::::'              ``::.
 ```` ':.          ':::::::::'                  ::::..
                    '.:::::'                    ':'````..
                    
 ━━━━━━━━━━━━━━━━━━━━ 女神保佑,永无BUG ━━━━━━━━━━━━━━━━━━━━
*/
//离线数据分析
object OffLineAnalysis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("OffLineAnalysis").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //加载配置
    DBs.setup()

    //sendRechargeReq 充值请求事件,取出充值请求事件所需的字段
    val filteredRDD: RDD[(String, String, String, String)] = sc.textFile("dir/data/cmcc.json")
      .filter(JSON.parseObject(_).getString("serviceName") == "sendRechargeReq").map(line => {
      val json = JSON.parseObject(line)
      val rst = json.getString("bussinessRst") //业务结果
      val chargefee = json.getString("chargefee") //充值金额
      val time = json.getString("endReqTime") //请求结束时间作为业务时间
      val date = DateUtils.getDateHourMin(time) //获取日期_小时_分钟
      val code = json.getString("provinceCode") //省份编号
      (rst, chargefee, date, code)
    })

    //需多次调用，做个缓存
    filteredRDD.cache()

    val cityInfoRDD: RDD[(String, String)] = sc.textFile("dir/city/city.txt").map(line => (line.split(" ")(0),line.split(" ")(1)))

    //需求一：以省份为维度统计每个省份的充值失败数,及失败率存入MySQL中
//    provinceFailCountTop3(sc,filteredRDD,cityInfoRDD)

    println("插入数据表成功")

    //充值请求 需求二：以省份为维度,统计每分钟各省的充值笔数和充值金额
    rechargeDistribution(filteredRDD,cityInfoRDD)

    println("\n*********************************分割线**********************************************************\n")

    //充值请求 需求二：以省份为维度,统计每分钟各省的充值笔数和充值金额
    rechargeDistributionHour(filteredRDD,cityInfoRDD)



  }

  //以省份为维度,统计每分钟各省的充值笔数和充值金额
  def rechargeDistributionHour(filteredRDD: RDD[(String, String, String, String)], cityInfoRDD: RDD[(String, String)])={
    val joined = filteredRDD.map(tup => {
      val pro = tup._4 //省份编号
      val dateTime = tup._3.split("_")(0) + "_" + tup._3.split("_")(1) //日期_小时
      val chargefee: Long = if (tup._1 == "0000") tup._2.toLong else 0L //充值金额，若充值失败则返回 0
      (pro + "," + dateTime, (chargefee, 1L)) //数据格式 ("省份编号,时间",(充值金额,充值笔数))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) //将数据格式转换为 (省份编号,(时间,充值金额,充值笔数))
      .map(tup => (tup._1.split(",")(0), (tup._1.split(",")(1), tup._2._1, tup._2._2)))
      .leftOuterJoin(cityInfoRDD)     //与省份信息进行join操作

    joined.foreachPartition(it => {
      it.foreach(line => {
        val proCode = line._1   //省份编号
        val pro = line._2._2.getOrElse("未知")  //省份名
        val time = line._2._1._1    //时间  日期_小时
        val chargefee = line._2._1._2   //充值金额
        val count = line._2._1._3       //充值笔数
        println(s"省份编号：$proCode ,省份名：$pro ,时间：$time ,充值金额：$chargefee ,充值笔数：$count")
      })
    })
  }

  //以省份为维度,统计每分钟各省的充值笔数和充值金额
  def rechargeDistribution(filteredRDD: RDD[(String, String, String, String)], cityInfoRDD: RDD[(String, String)])={
    val joined = filteredRDD.map(tup => {
      val pro = tup._4 //省份编号
      val dateTime = tup._3 //日期_小时_分钟
      val chargefee: Long = if (tup._1 == "0000") tup._2.toLong else 0L //充值金额，若充值失败则返回 0
      (pro + "," + dateTime, (chargefee, 1L)) //数据格式 ("省份编号,时间",(充值金额,充值笔数))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) //将数据格式转换为 (省份编号,(时间,充值金额,充值笔数))
      .map(tup => (tup._1.split(",")(0), (tup._1.split(",")(1), tup._2._1, tup._2._2)))
      .leftOuterJoin(cityInfoRDD)     //与省份信息进行join操作

    joined.foreachPartition(it => {
      it.foreach(line => {
        val proCode = line._1   //省份编号
        val pro = line._2._2.getOrElse("未知")  //省份名
        val time = line._2._1._1    //时间  日期_小时_分钟
        val chargefee = line._2._1._2   //充值金额
        val count = line._2._1._3       //充值笔数
        println(s"省份编号：$proCode ,省份名：$pro ,时间：$time ,充值金额：$chargefee ,充值笔数：$count")
      })
    })
  }



  //以省份为维度统计每个省份的充值失败数,及失败率存入MySQL中
  def provinceFailCountTop3(sc:SparkContext,filteredRDD: RDD[(String, String, String, String)], cityInfoRDD: RDD[(String, String)])={
    val proFailCount: RDD[(String, (Long, Long))] = filteredRDD.map(tup => {
      val rst = tup._1
      val code = tup._4
      if (rst != "0000")
        (code, (1L, 1L)) //第一个是省份编号，第二个元组中第一个数据是失败数，第二个是总订单数
      else
        (code, (0L, 1L))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))  //对失败数与总订单数分别聚合

    //以省份为维度取失败数的Top3
    val top3: Array[(String, (Long, Long))] = proFailCount.sortBy(_._2._1,false).take(3)

    sc.parallelize(top3).leftOuterJoin(cityInfoRDD).map(tup => {
      val line = tup._2
      val province = line._2.getOrElse("未找到")
      val failCount = line._1._1
      //计算出失败率，并转换为 % 形式
      val failRate:String = NumberUtils.formatDouble(failCount.toDouble/line._1._2,4)*100 + "%"

      //将记录插入数据库
      DB.autoCommit {
        implicit session =>
          SQL("insert into province_fail_count_Top3(province,failCount,failRate) values(?,?,?)")
            .bind(province,failCount,failRate).update().apply()
      }

    }).count()    //触发Action操作

  }


}
